#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Test for the mergecontacts example."""

from __future__ import absolute_import

import logging
import tempfile
import unittest

from apache_beam.examples.cookbook import mergecontacts
from apache_beam.testing.util import open_shards


class MergeContactsTest(unittest.TestCase):

  CONTACTS_EMAIL = '\n'.join(['Nathan Nomad\tnathan@example.com',
                              'Nicky Nomad\tnicky@example.com',
                              'Noreen Nomad\tnoreen@example.com',
                              'Noreen Nomad\tnomad@example.com',
                              'Robert B\trobert@example.com',
                              'Silviu C\tsilviu@example.com',
                              'Tom S\ttom@example.com',
                              'Wally Writer\twally@example.com',
                              ''])

  CONTACTS_PHONE = '\n'.join(['Larry Luddite\t724-228-3529',
                              'Lisa Luddite\t304-277-3504',
                              'Nathan Nomad\t412-466-8968',
                              'Nicky Nomad\t724-379-5815',
                              'Noreen Nomad\t412-472-0145',
                              'Robert B\t814-865-8799',
                              'Silviu C\t724-537-0671',
                              'Tom S\t570-368-3420',
                              'Tom S\t814-793-9655',
                              ''])

  CONTACTS_SNAILMAIL = '\n'.join(
      ['Larry Luddite\t1949 Westcott St, Detroit, MI 48222',
       'Lisa Luddite\t1949 Westcott St, Detroit, MI 48222',
       'Robert B\t601 N 34th St, Seattle, WA 98103',
       'Silviu C\t1600 Amphitheatre Pkwy, Mountain View, CA 94043',
       'Tom S\t6425 Penn Ave Ste 700, Pittsburgh, PA 15206',
       'Wally Writer\t101 Ridge Rd, Chalkyitsik, AK 99788',
       ''])

  EXPECTED_TSV = '\n'.join(
      ['\t'.join(['"Larry Luddite"', '""', '"724-228-3529"',
                  '"1949 Westcott St, Detroit, MI 48222"']),
       '\t'.join(['"Lisa Luddite"', '""', '"304-277-3504"',
                  '"1949 Westcott St, Detroit, MI 48222"']),
       '\t'.join(['"Nathan Nomad"', '"nathan@example.com"', '"412-466-8968"',
                  '""']),
       '\t'.join(['"Nicky Nomad"', '"nicky@example.com"', '"724-379-5815"',
                  '""']),
       '\t'.join(['"Noreen Nomad"', '"nomad@example.com,noreen@example.com"',
                  '"412-472-0145"', '""']),
       '\t'.join(['"Robert B"', '"robert@example.com"', '"814-865-8799"',
                  '"601 N 34th St, Seattle, WA 98103"']),
       '\t'.join(['"Silviu C"', '"silviu@example.com"', '"724-537-0671"',
                  '"1600 Amphitheatre Pkwy, Mountain View, CA 94043"']),
       '\t'.join(['"Tom S"', '"tom@example.com"', '"570-368-3420,814-793-9655"',
                  '"6425 Penn Ave Ste 700, Pittsburgh, PA 15206"']),
       '\t'.join(['"Wally Writer"', '"wally@example.com"', '""',
                  '"101 Ridge Rd, Chalkyitsik, AK 99788"']),
       ''])

  EXPECTED_STATS = '\n'.join(['2 luddites',
                              '1 writers',
                              '3 nomads',
                              ''])

  def create_temp_file(self, contents):
    with tempfile.NamedTemporaryFile(delete=False) as f:
      f.write(contents.encode('utf-8'))
      return f.name

  def normalize_tsv_results(self, tsv_data):
    """Sort .tsv file data so we can compare it with expected output."""
    lines_in = tsv_data.strip().split('\n')
    lines_out = []
    for line in lines_in:
      name, email, phone, snailmail = line.split('\t')
      lines_out.append('\t'.join(
          [name,
           '"%s"' % ','.join(sorted(email.strip('"').split(','))),
           '"%s"' % ','.join(sorted(phone.strip('"').split(','))),
           snailmail]))
    return '\n'.join(sorted(lines_out)) + '\n'

  def test_mergecontacts(self):
    path_email = self.create_temp_file(self.CONTACTS_EMAIL)
    path_phone = self.create_temp_file(self.CONTACTS_PHONE)
    path_snailmail = self.create_temp_file(self.CONTACTS_SNAILMAIL)

    result_prefix = self.create_temp_file('')

    mergecontacts.run([
        '--input_email=%s' % path_email,
        '--input_phone=%s' % path_phone,
        '--input_snailmail=%s' % path_snailmail,
        '--output_tsv=%s.tsv' % result_prefix,
        '--output_stats=%s.stats' % result_prefix], assert_results=(2, 1, 3))

    with open_shards('%s.tsv-*-of-*' % result_prefix) as f:
      contents = f.read()
      self.assertEqual(self.EXPECTED_TSV, self.normalize_tsv_results(contents))


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  unittest.main()
